package com.tea.modules.producer;

import com.tea.modules.config.MyPartition;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * kafka生产者
 *
 * @author jaymin
 * @since 2022/1/3 20:53
 */
@Slf4j
public class ProducerDemo {
    /**
     * TOPIC名称
     */
    public static final String TOPIC_NAME = "my_topic";

    /**
     * 异步发送消息
     */
    public static void asyncSend() {
        KafkaProducer<String, String> kafkaProducer = getKafkaProducer();
        try {
            // ProducerRecorder
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "jay-" + i, "demo-" + i);
                kafkaProducer.send(producerRecord);
            }
        } finally {
            // 关闭kafka通道
            kafkaProducer.close();
        }
    }

    @NotNull
    private static KafkaProducer<String, String> getKafkaProducer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "3");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.tea.modules.config.MyPartition");
        // config
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        return kafkaProducer;
    }

    /**
     * 异步阻塞发送
     */
    public static void syncSend() {
        KafkaProducer<String, String> kafkaProducer = getKafkaProducer();
        try {
            // ProducerRecorder
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "jay-" + i, "demo-" + i);
                Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord);
                // 发送完消息后等待回调
                RecordMetadata metadata = metadataFuture.get();
                System.out.println("partition:" + metadata.partition() + "|offset:" + metadata.offset());
            }
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭kafka通道
            kafkaProducer.close();
        }
    }

    /**
     * 异步回调发送
     */
    public static void sendAndCallback() {
        KafkaProducer<String, String> kafkaProducer = getKafkaProducer();
        try {
            // ProducerRecorder
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "jay-" + i, "demo-" + i);
                Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord, ((recordMetadata, exception) -> {
                    log.warn("recordMetadata:{},exception:{}", recordMetadata, exception);
                }));
            }
        } finally {
            // 关闭kafka通道
            kafkaProducer.close();
        }
    }

    public static void main(String[] args) {
//        asyncSend();
//        syncSend();
        sendAndCallback();
    }
}
